Kiểm tra và cài đặt Maven
Cài đặt bằng lệnh
sudo apt update
sudo apt install maven
Kiểm tra
mvn -version
Nếu cài đặt đúng thì sẽ trả về
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 1.8.0_442, vendor: Private Build, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: ANSI_X3.4-1968
OS name: "linux", version: "5.15.167.4-microsoft-standard-wsl2", arch: "amd64", family: "unix"
mvn archetype:generate -DgroupId=com.example -DartifactId=SparkApp -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
sau đó sẽ được thư mục có cấu trúc như sau
pom.xml <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>SparkApp</artifactId>
<version>1.0-SNAPSHOT</version>
<name>SparkApp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.3</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
xin chao cac ban xin chao Hadoop Bid Data minh xin tu gioi thieu minh len la Luu Vinh Tuong
package spark.main;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Spark Word Count");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
String inputPath = "hdfs://vinhtuong-master:9000/input/input_1.txt";
String outputPath = "hdfs://vinhtuong-master:9000/output/result";
JavaRDD<String> textFile = sc.textFile(inputPath).cache();
JavaPairRDD<String, Integer> wordCounts = textFile
.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
.mapToPair(word -> new Tuple2<>(word.replaceAll("[^a-zA-Z]", "").toLowerCase(), 1))
.reduceByKey(Integer::sum);
wordCounts.coalesce(1).saveAsTextFile(outputPath);
System.out.println("Word Count completed. Output saved to " + outputPath);
}
}
}
mvn clean package
Sau khi đóng gói, chúng ta sẽ được thư mục target như sau:
spark-submit --class spark.main.WordCount --master local[*] target/SparkApp-1.0-SNAPSHOT.jar
name,age,city
Alice,30,New York
Bob,25,Los Angeles
Charlie,35,Chicago
David,40,Houston
Emma,22,San Francisco
Frank,28,Seattle
Grace,33,Boston
Henry,27,Denver
package spark.main;
import org.apache.spark.sql.*;
public class SparkSQLExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Spark SQL Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("hdfs://vinhtuong-master:9000/input/people.csv");
System.out.println("Du lieu ban dau:");
df.show();
System.out.println("Schema:");
df.printSchema();
System.out.println("Chon name va age:");
df.select("name", "age").show();
System.out.println("Loc nhung nguoi tren 25 tuoi:");
df.filter("age > 25").show();
System.out.println("Nhom theo so tuoi va dem so nguoi:");
df.groupBy("age").count().show();
df.write().mode("overwrite").csv("hdfs://vinhtuong-master:9000/output/people");
spark.stop();
}
}
mvn clean package
spark-submit --class spark.main.SparkSQLExample --master local[*] target/SparkApp-1.0-SNAPSHOT.jar
Output bên hdfs sẽ không có gì. Vì những thao tác mình không lưu lại (check trên code) mà chỉ show ra màn hình
Nên sẽ check file thực thi ở mục trên
Lấy dữ liệu tại đây.
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_1 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-1")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
// number of customer distinct
// except 1 because there are value is null of information customer ID
long cntCustomers = data.select("CustomerID").distinct().count() - 1;
// number of product distinct
long cntProdcts = data.select("StockCode").distinct().count();
// number of invoice distinct
long cntInvoices = data.select("InvoiceNo").distinct().count();
// print
System.out.println("Number of customer distinct: " + cntCustomers);
System.out.println("Number of product distinct: " + cntProdcts);
System.out.println("Number of invoice distinct: " + cntInvoices);
}
}
mvn clean package
spark-submit --class spark.main.part_1 target/SparkApp-1.0-SNAPSHOT.jar
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_2 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-2")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
// get number of customer (done part 1)
long cntCustomers = data.select("CustomerID").count();
// get number of customer no information
long cntCustomersNoInfor = data.select("CustomerID").filter(data.col("CustomerID").isNull()).count();
double ratio = (double) cntCustomersNoInfor / cntCustomers * 100;
System.out.printf("Ratio no information: %f \n", ratio);
}
}
mvn clean package
spark-submit --class spark.main.part_2 target/SparkApp-1.0-SNAPSHOT.jar
Tỉ lệ khoảng 29,93 % khách hàng không có thông tin
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_3 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-2")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.createOrReplaceTempView("data");
spark.sql("select Country, sum(Quantity) as count from data group by Country order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_3 target/SparkApp-1.0-SNAPSHOT.jar
output sắp xếp từ cao đến thấp => nhiều thứ 3 là EIRE với count = 142637
package spark.main;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.types.StructType;
public class part_4 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-4")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.where("Description is not null").flatMap(new FlatMapFunction<Row, Row>() {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
public Iterator<Row> call(Row r) throws Exception {
List<String> listItem = Arrays.asList(r.getString(2).split(" "));
List<Row> listItemRow = new ArrayList<Row>();
for (String item : listItem) {
listItemRow.add(RowFactory.create(cnt, item, 1));
cnt++;
}
return listItemRow.iterator();
}
}, Encoders.row(new StructType()
.add("number", "integer")
.add("word", "string")
.add("lit", "integer")))
.createOrReplaceTempView("data");
spark.sql("select word, count(lit) as count from data group by word order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_4 target/SparkApp-1.0-SNAPSHOT.jar
Đổi code thành asc để hiển thị từ thấp đến lớn
spark.sql("select word, count(lit) as count from data group by word order by count asc").show();
Những từ trong bảng là top những xuất hiện ít nhất trong phần description
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_5 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-4")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.filter(data.col("Country").equalTo("United Kingdom")).createOrReplaceTempView("data");
spark.sql("select Description, sum(Quantity) as count from data group by Description order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_5 target/SparkApp-1.0-SNAPSHOT.jar
check tên đầy đủ trong csv là WORLD WAR 2 GLIDERS ASSTD DESIGNS
với số lượng 48326